-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow inserting subschemas #3041
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3041 +/- ##
==========================================
- Coverage 77.16% 77.06% -0.10%
==========================================
Files 240 240
Lines 79764 80417 +653
Branches 79764 80417 +653
==========================================
+ Hits 61548 61975 +427
- Misses 15071 15265 +194
- Partials 3145 3177 +32
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
ef8c62d
to
24c27e7
Compare
3c1ac51
to
e239a69
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. Thanks for cleaning up all that schema / field comparison logic too!
@@ -476,13 +432,13 @@ impl Field { | |||
/// | |||
/// If the ids are `[2]`, then this will include the parent `0` and the | |||
/// child `3`. | |||
pub(crate) fn project_by_ids(&self, ids: &[i32]) -> Option<Self> { | |||
pub(crate) fn project_by_ids(&self, ids: &[i32], include_all_children: bool) -> Option<Self> { | |||
let children = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super minor nit: I'm guessing the optimizer catches this but it might be faster to only calculate children if we need it...
pub(crate) fn project_by_ids(&self, ids: &[i32], include_all_children: bool) -> Option<Self> {
if !ids.contains(&self.id) {
return None;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually don't want to early return, because even if a field isn't selected, we want to check if it has children that are.
rust/lance/src/dataset/fragment.rs
Outdated
// Check if there are any fields that are not in any data files | ||
let field_ids_in_files = opened_files | ||
.iter() | ||
.flat_map(|r| r.projection().fields_pre_order().map(|f| f.id)) | ||
.filter(|id| *id >= 0) | ||
.collect::<HashSet<_>>(); | ||
let mut missing_fields = projection.field_ids(); | ||
missing_fields.retain(|f| !field_ids_in_files.contains(f) && *f >= 0); | ||
if !missing_fields.is_empty() { | ||
let missing_projection = projection.project_by_ids(&missing_fields, true); | ||
let null_reader = NullReader::new(Arc::new(missing_projection), opened_files[0].len()); | ||
opened_files.push(Box::new(null_reader)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is neat!
c53fb4a
to
f916af7
Compare
In #2639 we added support for *updating* subcolumns. In #3041 we added support for *inserting* subcolumns. This PR adds support for upserting them (or doing insert-if-not-exists). Closes #2904 ## Example ```python import pyarrow as pa import lance table = pa.table({ "id": range(3), "a": [1.0, 2.0, 3.0], "c": ["x", "x", "x"] }) dataset = lance.write_dataset(table, "example") # Upsert: when_matched_update_all + when_not_matched_insert_all new_data = pa.table({ "id": [2, 3], "c": ["y", "y"] }) ( dataset .merge_insert(on="id") .when_matched_update_all() .when_not_matched_insert_all() .execute(new_data) ) dataset.to_table().to_pandas() ``` ``` id a c 0 0 1.0 x 1 1 2.0 x 2 2 3.0 y 3 3 NaN y ``` ```python # Insert-if-not-exists: when_not_matched_insert_all new_data = pa.table({ "id": [3, 4], "c": ["z", "z"] }) ( dataset .merge_insert(on="id") .when_not_matched_insert_all() .execute(new_data) ) dataset.to_table().to_pandas() id a c 0 0 1.0 x 1 1 2.0 x 2 2 3.0 y 3 3 NaN y 4 4 NaN z ```
Allow inserting subset of columns in the schema, if missing columns are nullable. Missing columns will be filled with null values. This even works with nested fields.
For example:
Closes #3016